tokio\sync\mpsc/unbounded.rs
1use crate::loom::sync::{atomic::AtomicUsize, Arc};
2use crate::sync::mpsc::chan;
3use crate::sync::mpsc::error::{SendError, TryRecvError};
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `UnboundedReceiver`.
9///
10/// Instances are created by the [`unbounded_channel`] function.
11pub struct UnboundedSender<T> {
12 chan: chan::Tx<T, Semaphore>,
13}
14
15/// An unbounded sender that does not prevent the channel from being closed.
16///
17/// If all [`UnboundedSender`] instances of a channel were dropped and only
18/// `WeakUnboundedSender` instances remain, the channel is closed.
19///
20/// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using
21/// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None`
22/// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`.
23///
24/// [`UnboundedSender`]: UnboundedSender
25/// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade
26///
27/// # Examples
28///
29/// ```
30/// use tokio::sync::mpsc::unbounded_channel;
31///
32/// # #[tokio::main(flavor = "current_thread")]
33/// # async fn main() {
34/// let (tx, _rx) = unbounded_channel::<i32>();
35/// let tx_weak = tx.downgrade();
36///
37/// // Upgrading will succeed because `tx` still exists.
38/// assert!(tx_weak.upgrade().is_some());
39///
40/// // If we drop `tx`, then it will fail.
41/// drop(tx);
42/// assert!(tx_weak.clone().upgrade().is_none());
43/// # }
44/// ```
45pub struct WeakUnboundedSender<T> {
46 chan: Arc<chan::Chan<T, Semaphore>>,
47}
48
49impl<T> Clone for UnboundedSender<T> {
50 fn clone(&self) -> Self {
51 UnboundedSender {
52 chan: self.chan.clone(),
53 }
54 }
55}
56
57impl<T> fmt::Debug for UnboundedSender<T> {
58 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59 fmt.debug_struct("UnboundedSender")
60 .field("chan", &self.chan)
61 .finish()
62 }
63}
64
65/// Receive values from the associated `UnboundedSender`.
66///
67/// Instances are created by the [`unbounded_channel`] function.
68///
69/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
70///
71/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
72pub struct UnboundedReceiver<T> {
73 /// The channel receiver
74 chan: chan::Rx<T, Semaphore>,
75}
76
77impl<T> fmt::Debug for UnboundedReceiver<T> {
78 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
79 fmt.debug_struct("UnboundedReceiver")
80 .field("chan", &self.chan)
81 .finish()
82 }
83}
84
85/// Creates an unbounded mpsc channel for communicating between asynchronous
86/// tasks without backpressure.
87///
88/// A `send` on this channel will always succeed as long as the receive half has
89/// not been closed. If the receiver falls behind, messages will be arbitrarily
90/// buffered.
91///
92/// **Note** that the amount of available system memory is an implicit bound to
93/// the channel. Using an `unbounded` channel has the ability of causing the
94/// process to run out of memory. In this case, the process will be aborted.
95pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
96 let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0)));
97
98 let tx = UnboundedSender::new(tx);
99 let rx = UnboundedReceiver::new(rx);
100
101 (tx, rx)
102}
103
104/// No capacity
105#[derive(Debug)]
106pub(crate) struct Semaphore(pub(crate) AtomicUsize);
107
108impl<T> UnboundedReceiver<T> {
109 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
110 UnboundedReceiver { chan }
111 }
112
113 /// Receives the next value for this receiver.
114 ///
115 /// This method returns `None` if the channel has been closed and there are
116 /// no remaining messages in the channel's buffer. This indicates that no
117 /// further values can ever be received from this `Receiver`. The channel is
118 /// closed when all senders have been dropped, or when [`close`] is called.
119 ///
120 /// If there are no messages in the channel's buffer, but the channel has
121 /// not yet been closed, this method will sleep until a message is sent or
122 /// the channel is closed.
123 ///
124 /// # Cancel safety
125 ///
126 /// This method is cancel safe. If `recv` is used as the event in a
127 /// [`tokio::select!`](crate::select) statement and some other branch
128 /// completes first, it is guaranteed that no messages were received on this
129 /// channel.
130 ///
131 /// [`close`]: Self::close
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::sync::mpsc;
137 ///
138 /// # #[tokio::main(flavor = "current_thread")]
139 /// # async fn main() {
140 /// let (tx, mut rx) = mpsc::unbounded_channel();
141 ///
142 /// tokio::spawn(async move {
143 /// tx.send("hello").unwrap();
144 /// });
145 ///
146 /// assert_eq!(Some("hello"), rx.recv().await);
147 /// assert_eq!(None, rx.recv().await);
148 /// # }
149 /// ```
150 ///
151 /// Values are buffered:
152 ///
153 /// ```
154 /// use tokio::sync::mpsc;
155 ///
156 /// # #[tokio::main(flavor = "current_thread")]
157 /// # async fn main() {
158 /// let (tx, mut rx) = mpsc::unbounded_channel();
159 ///
160 /// tx.send("hello").unwrap();
161 /// tx.send("world").unwrap();
162 ///
163 /// assert_eq!(Some("hello"), rx.recv().await);
164 /// assert_eq!(Some("world"), rx.recv().await);
165 /// # }
166 /// ```
167 pub async fn recv(&mut self) -> Option<T> {
168 use std::future::poll_fn;
169
170 poll_fn(|cx| self.poll_recv(cx)).await
171 }
172
173 /// Receives the next values for this receiver and extends `buffer`.
174 ///
175 /// This method extends `buffer` by no more than a fixed number of values
176 /// as specified by `limit`. If `limit` is zero, the function returns
177 /// immediately with `0`. The return value is the number of values added to
178 /// `buffer`.
179 ///
180 /// For `limit > 0`, if there are no messages in the channel's queue,
181 /// but the channel has not yet been closed, this method will sleep
182 /// until a message is sent or the channel is closed.
183 ///
184 /// For non-zero values of `limit`, this method will never return `0` unless
185 /// the channel has been closed and there are no remaining messages in the
186 /// channel's queue. This indicates that no further values can ever be
187 /// received from this `Receiver`. The channel is closed when all senders
188 /// have been dropped, or when [`close`] is called.
189 ///
190 /// The capacity of `buffer` is increased as needed.
191 ///
192 /// # Cancel safety
193 ///
194 /// This method is cancel safe. If `recv_many` is used as the event in a
195 /// [`tokio::select!`](crate::select) statement and some other branch
196 /// completes first, it is guaranteed that no messages were received on this
197 /// channel.
198 ///
199 /// [`close`]: Self::close
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use tokio::sync::mpsc;
205 ///
206 /// # #[tokio::main(flavor = "current_thread")]
207 /// # async fn main() {
208 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
209 /// let limit = 2;
210 /// let (tx, mut rx) = mpsc::unbounded_channel();
211 /// let tx2 = tx.clone();
212 /// tx2.send("first").unwrap();
213 /// tx2.send("second").unwrap();
214 /// tx2.send("third").unwrap();
215 ///
216 /// // Call `recv_many` to receive up to `limit` (2) values.
217 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
218 /// assert_eq!(vec!["first", "second"], buffer);
219 ///
220 /// // If the buffer is full, the next call to `recv_many`
221 /// // reserves additional capacity.
222 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
223 ///
224 /// tokio::spawn(async move {
225 /// tx.send("fourth").unwrap();
226 /// });
227 ///
228 /// // 'tx' is dropped, but `recv_many`
229 /// // is guaranteed not to return 0 as the channel
230 /// // is not yet closed.
231 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
232 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
233 ///
234 /// // Once the last sender is dropped, the channel is
235 /// // closed and `recv_many` returns 0, capacity unchanged.
236 /// drop(tx2);
237 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
238 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
239 /// # }
240 /// ```
241 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
242 use std::future::poll_fn;
243 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
244 }
245
246 /// Tries to receive the next value for this receiver.
247 ///
248 /// This method returns the [`Empty`] error if the channel is currently
249 /// empty, but there are still outstanding [senders] or [permits].
250 ///
251 /// This method returns the [`Disconnected`] error if the channel is
252 /// currently empty, and there are no outstanding [senders] or [permits].
253 ///
254 /// Unlike the [`poll_recv`] method, this method will never return an
255 /// [`Empty`] error spuriously.
256 ///
257 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
258 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
259 /// [`poll_recv`]: Self::poll_recv
260 /// [senders]: crate::sync::mpsc::Sender
261 /// [permits]: crate::sync::mpsc::Permit
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use tokio::sync::mpsc;
267 /// use tokio::sync::mpsc::error::TryRecvError;
268 ///
269 /// # #[tokio::main(flavor = "current_thread")]
270 /// # async fn main() {
271 /// let (tx, mut rx) = mpsc::unbounded_channel();
272 ///
273 /// tx.send("hello").unwrap();
274 ///
275 /// assert_eq!(Ok("hello"), rx.try_recv());
276 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
277 ///
278 /// tx.send("hello").unwrap();
279 /// // Drop the last sender, closing the channel.
280 /// drop(tx);
281 ///
282 /// assert_eq!(Ok("hello"), rx.try_recv());
283 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
284 /// # }
285 /// ```
286 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
287 self.chan.try_recv()
288 }
289
290 /// Blocking receive to call outside of asynchronous contexts.
291 ///
292 /// # Panics
293 ///
294 /// This function panics if called within an asynchronous execution
295 /// context.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// # #[cfg(not(target_family = "wasm"))]
301 /// # {
302 /// use std::thread;
303 /// use tokio::sync::mpsc;
304 ///
305 /// #[tokio::main]
306 /// async fn main() {
307 /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>();
308 ///
309 /// let sync_code = thread::spawn(move || {
310 /// assert_eq!(Some(10), rx.blocking_recv());
311 /// });
312 ///
313 /// let _ = tx.send(10);
314 /// sync_code.join().unwrap();
315 /// }
316 /// # }
317 /// ```
318 #[track_caller]
319 #[cfg(feature = "sync")]
320 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
321 pub fn blocking_recv(&mut self) -> Option<T> {
322 crate::future::block_on(self.recv())
323 }
324
325 /// Variant of [`Self::recv_many`] for blocking contexts.
326 ///
327 /// The same conditions as in [`Self::blocking_recv`] apply.
328 #[track_caller]
329 #[cfg(feature = "sync")]
330 #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
331 pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
332 crate::future::block_on(self.recv_many(buffer, limit))
333 }
334
335 /// Closes the receiving half of a channel, without dropping it.
336 ///
337 /// This prevents any further messages from being sent on the channel while
338 /// still enabling the receiver to drain messages that are buffered.
339 ///
340 /// To guarantee that no messages are dropped, after calling `close()`,
341 /// `recv()` must be called until `None` is returned.
342 pub fn close(&mut self) {
343 self.chan.close();
344 }
345
346 /// Checks if a channel is closed.
347 ///
348 /// This method returns `true` if the channel has been closed. The channel is closed
349 /// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called.
350 ///
351 /// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
352 /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
353 ///
354 /// # Examples
355 /// ```
356 /// use tokio::sync::mpsc;
357 ///
358 /// # #[tokio::main(flavor = "current_thread")]
359 /// # async fn main() {
360 /// let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
361 /// assert!(!rx.is_closed());
362 ///
363 /// rx.close();
364 ///
365 /// assert!(rx.is_closed());
366 /// # }
367 /// ```
368 pub fn is_closed(&self) -> bool {
369 self.chan.is_closed()
370 }
371
372 /// Checks if a channel is empty.
373 ///
374 /// This method returns `true` if the channel has no messages.
375 ///
376 /// # Examples
377 /// ```
378 /// use tokio::sync::mpsc;
379 ///
380 /// # #[tokio::main(flavor = "current_thread")]
381 /// # async fn main() {
382 /// let (tx, rx) = mpsc::unbounded_channel();
383 /// assert!(rx.is_empty());
384 ///
385 /// tx.send(0).unwrap();
386 /// assert!(!rx.is_empty());
387 /// # }
388 ///
389 /// ```
390 pub fn is_empty(&self) -> bool {
391 self.chan.is_empty()
392 }
393
394 /// Returns the number of messages in the channel.
395 ///
396 /// # Examples
397 /// ```
398 /// use tokio::sync::mpsc;
399 ///
400 /// # #[tokio::main(flavor = "current_thread")]
401 /// # async fn main() {
402 /// let (tx, rx) = mpsc::unbounded_channel();
403 /// assert_eq!(0, rx.len());
404 ///
405 /// tx.send(0).unwrap();
406 /// assert_eq!(1, rx.len());
407 /// # }
408 /// ```
409 pub fn len(&self) -> usize {
410 self.chan.len()
411 }
412
413 /// Polls to receive the next message on this channel.
414 ///
415 /// This method returns:
416 ///
417 /// * `Poll::Pending` if no messages are available but the channel is not
418 /// closed, or if a spurious failure happens.
419 /// * `Poll::Ready(Some(message))` if a message is available.
420 /// * `Poll::Ready(None)` if the channel has been closed and all messages
421 /// sent before it was closed have been received.
422 ///
423 /// When the method returns `Poll::Pending`, the `Waker` in the provided
424 /// `Context` is scheduled to receive a wakeup when a message is sent on any
425 /// receiver, or when the channel is closed. Note that on multiple calls to
426 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
427 /// passed to the most recent call is scheduled to receive a wakeup.
428 ///
429 /// If this method returns `Poll::Pending` due to a spurious failure, then
430 /// the `Waker` will be notified when the situation causing the spurious
431 /// failure has been resolved. Note that receiving such a wakeup does not
432 /// guarantee that the next call will succeed — it could fail with another
433 /// spurious failure.
434 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
435 self.chan.recv(cx)
436 }
437
438 /// Polls to receive multiple messages on this channel, extending the provided buffer.
439 ///
440 /// This method returns:
441 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
442 /// spurious failure happens.
443 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
444 /// stored in `buffer`. This can be less than, or equal to, `limit`.
445 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
446 ///
447 /// When the method returns `Poll::Pending`, the `Waker` in the provided
448 /// `Context` is scheduled to receive a wakeup when a message is sent on any
449 /// receiver, or when the channel is closed. Note that on multiple calls to
450 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
451 /// passed to the most recent call is scheduled to receive a wakeup.
452 ///
453 /// Note that this method does not guarantee that exactly `limit` messages
454 /// are received. Rather, if at least one message is available, it returns
455 /// as many messages as it can up to the given limit. This method returns
456 /// zero only if the channel is closed (or if `limit` is zero).
457 ///
458 /// # Examples
459 ///
460 /// ```
461 /// # #[cfg(not(target_family = "wasm"))]
462 /// # {
463 /// use std::task::{Context, Poll};
464 /// use std::pin::Pin;
465 /// use tokio::sync::mpsc;
466 /// use futures::Future;
467 ///
468 /// struct MyReceiverFuture<'a> {
469 /// receiver: mpsc::UnboundedReceiver<i32>,
470 /// buffer: &'a mut Vec<i32>,
471 /// limit: usize,
472 /// }
473 ///
474 /// impl<'a> Future for MyReceiverFuture<'a> {
475 /// type Output = usize; // Number of messages received
476 ///
477 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
478 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
479 ///
480 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
481 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
482 /// Poll::Pending => Poll::Pending,
483 /// Poll::Ready(count) => Poll::Ready(count),
484 /// }
485 /// }
486 /// }
487 ///
488 /// # #[tokio::main(flavor = "current_thread")]
489 /// # async fn main() {
490 /// let (tx, rx) = mpsc::unbounded_channel::<i32>();
491 /// let mut buffer = Vec::new();
492 ///
493 /// let my_receiver_future = MyReceiverFuture {
494 /// receiver: rx,
495 /// buffer: &mut buffer,
496 /// limit: 3,
497 /// };
498 ///
499 /// for i in 0..10 {
500 /// tx.send(i).expect("Unable to send integer");
501 /// }
502 ///
503 /// let count = my_receiver_future.await;
504 /// assert_eq!(count, 3);
505 /// assert_eq!(buffer, vec![0,1,2])
506 /// # }
507 /// # }
508 /// ```
509 pub fn poll_recv_many(
510 &mut self,
511 cx: &mut Context<'_>,
512 buffer: &mut Vec<T>,
513 limit: usize,
514 ) -> Poll<usize> {
515 self.chan.recv_many(cx, buffer, limit)
516 }
517
518 /// Returns the number of [`UnboundedSender`] handles.
519 pub fn sender_strong_count(&self) -> usize {
520 self.chan.sender_strong_count()
521 }
522
523 /// Returns the number of [`WeakUnboundedSender`] handles.
524 pub fn sender_weak_count(&self) -> usize {
525 self.chan.sender_weak_count()
526 }
527}
528
529impl<T> UnboundedSender<T> {
530 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
531 UnboundedSender { chan }
532 }
533
534 /// Attempts to send a message on this `UnboundedSender` without blocking.
535 ///
536 /// This method is not marked as `async` because sending a message to an unbounded channel
537 /// never requires any form of waiting. This is due to the channel's infinite capacity,
538 /// allowing the `send` operation to complete immediately. As a result, the `send` method can be
539 /// used in both synchronous and asynchronous code without issues.
540 ///
541 /// If the receive half of the channel is closed, either due to [`close`]
542 /// being called or the [`UnboundedReceiver`] having been dropped, this
543 /// function returns an error. The error includes the value passed to `send`.
544 ///
545 /// [`close`]: UnboundedReceiver::close
546 /// [`UnboundedReceiver`]: UnboundedReceiver
547 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
548 if !self.inc_num_messages() {
549 return Err(SendError(message));
550 }
551
552 self.chan.send(message);
553 Ok(())
554 }
555
556 fn inc_num_messages(&self) -> bool {
557 use std::process;
558 use std::sync::atomic::Ordering::{AcqRel, Acquire};
559
560 let mut curr = self.chan.semaphore().0.load(Acquire);
561
562 loop {
563 if curr & 1 == 1 {
564 return false;
565 }
566
567 if curr == usize::MAX ^ 1 {
568 // Overflowed the ref count. There is no safe way to recover, so
569 // abort the process. In practice, this should never happen.
570 process::abort()
571 }
572
573 match self
574 .chan
575 .semaphore()
576 .0
577 .compare_exchange(curr, curr + 2, AcqRel, Acquire)
578 {
579 Ok(_) => return true,
580 Err(actual) => {
581 curr = actual;
582 }
583 }
584 }
585 }
586
587 /// Completes when the receiver has dropped.
588 ///
589 /// This allows the producers to get notified when interest in the produced
590 /// values is canceled and immediately stop doing work.
591 ///
592 /// # Cancel safety
593 ///
594 /// This method is cancel safe. Once the channel is closed, it stays closed
595 /// forever and all future calls to `closed` will return immediately.
596 ///
597 /// # Examples
598 ///
599 /// ```
600 /// use tokio::sync::mpsc;
601 ///
602 /// # #[tokio::main(flavor = "current_thread")]
603 /// # async fn main() {
604 /// let (tx1, rx) = mpsc::unbounded_channel::<()>();
605 /// let tx2 = tx1.clone();
606 /// let tx3 = tx1.clone();
607 /// let tx4 = tx1.clone();
608 /// let tx5 = tx1.clone();
609 /// tokio::spawn(async move {
610 /// drop(rx);
611 /// });
612 ///
613 /// futures::join!(
614 /// tx1.closed(),
615 /// tx2.closed(),
616 /// tx3.closed(),
617 /// tx4.closed(),
618 /// tx5.closed()
619 /// );
620 /// println!("Receiver dropped");
621 /// # }
622 /// ```
623 pub async fn closed(&self) {
624 self.chan.closed().await;
625 }
626
627 /// Checks if the channel has been closed. This happens when the
628 /// [`UnboundedReceiver`] is dropped, or when the
629 /// [`UnboundedReceiver::close`] method is called.
630 ///
631 /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
632 /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
633 ///
634 /// ```
635 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
636 /// assert!(!tx.is_closed());
637 ///
638 /// let tx2 = tx.clone();
639 /// assert!(!tx2.is_closed());
640 ///
641 /// drop(rx);
642 /// assert!(tx.is_closed());
643 /// assert!(tx2.is_closed());
644 /// ```
645 pub fn is_closed(&self) -> bool {
646 self.chan.is_closed()
647 }
648
649 /// Returns `true` if senders belong to the same channel.
650 ///
651 /// # Examples
652 ///
653 /// ```
654 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
655 /// let tx2 = tx.clone();
656 /// assert!(tx.same_channel(&tx2));
657 ///
658 /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>();
659 /// assert!(!tx3.same_channel(&tx2));
660 /// ```
661 pub fn same_channel(&self, other: &Self) -> bool {
662 self.chan.same_channel(&other.chan)
663 }
664
665 /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count
666 /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the
667 /// channel were dropped and only `WeakUnboundedSender` instances remain,
668 /// the channel is closed.
669 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
670 pub fn downgrade(&self) -> WeakUnboundedSender<T> {
671 WeakUnboundedSender {
672 chan: self.chan.downgrade(),
673 }
674 }
675
676 /// Returns the number of [`UnboundedSender`] handles.
677 pub fn strong_count(&self) -> usize {
678 self.chan.strong_count()
679 }
680
681 /// Returns the number of [`WeakUnboundedSender`] handles.
682 pub fn weak_count(&self) -> usize {
683 self.chan.weak_count()
684 }
685}
686
687impl<T> Clone for WeakUnboundedSender<T> {
688 fn clone(&self) -> Self {
689 self.chan.increment_weak_count();
690
691 WeakUnboundedSender {
692 chan: self.chan.clone(),
693 }
694 }
695}
696
697impl<T> Drop for WeakUnboundedSender<T> {
698 fn drop(&mut self) {
699 self.chan.decrement_weak_count();
700 }
701}
702
703impl<T> WeakUnboundedSender<T> {
704 /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`].
705 /// This will return `Some` if there are other `Sender` instances alive and
706 /// the channel wasn't previously dropped, otherwise `None` is returned.
707 pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
708 chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new)
709 }
710
711 /// Returns the number of [`UnboundedSender`] handles.
712 pub fn strong_count(&self) -> usize {
713 self.chan.strong_count()
714 }
715
716 /// Returns the number of [`WeakUnboundedSender`] handles.
717 pub fn weak_count(&self) -> usize {
718 self.chan.weak_count()
719 }
720}
721
722impl<T> fmt::Debug for WeakUnboundedSender<T> {
723 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
724 fmt.debug_struct("WeakUnboundedSender").finish()
725 }
726}